use crate::logical_plan::work_space::{work_space_register_online,WorkSpaceConf};
use crate::logical_plan::work_node::conf::{WorkNodeConf};
use crate::sink::{SinkConf, sink_register_online,sink_add_input_online};
use crate::source::{SourceConf, source_register_online};
use crate::conf_init::MapResult;
use futures::executor::block_on;

#[derive(serde::Deserialize, Debug)]
struct WorkSpaceFilter {
    work_space : WorkSpaceConf,
    work_node : Vec<WorkNodeConf>,
}

#[derive(serde::Deserialize, Debug)]
struct SinkInputReg {
    sink_name : String,
    input_name : String,
}

/*
/register/work_space
*/
pub async fn register_group_dispatch_async(mode:&str, data: &[u8])->Result<(String,Option<String>),String>{
    match mode {
        "work_space"=>{
            let deserialized: WorkSpaceFilter = serde_json::from_slice(data).unwrap();

            if let MapResult::Err(e) = work_space_register_online(&deserialized.work_space, &deserialized.work_node).await{
                return Err(format!("register space err {:?}", e))
            }

            return Ok(("ok".to_string(),Some(String::from_utf8(data.to_vec()).unwrap())));
        }
        "source" => {
            let deserialized: SourceConf = serde_json::from_slice(data).unwrap();
            if let MapResult::Err(e) = source_register_online(&deserialized).await{
                return Err(format!("register source err {:?}", e));
            }
            return Ok(("ok".to_string(),Some(String::from_utf8(data.to_vec()).unwrap())));
        }
        "sink_add_input" => {
            let deserialized: SinkInputReg = serde_json::from_slice(data).unwrap();
            if let MapResult::Err(e) = sink_add_input_online(&deserialized.sink_name, &deserialized.input_name).await{
                return Err(format!("register sink input err {:?}", e));
            }

            return Ok(("ok".to_string(),Some(String::from_utf8(data.to_vec()).unwrap())));
        }
        "sink" => {
            let deserialized: SinkConf = serde_json::from_slice(data).unwrap();
            if let MapResult::Err(e) = sink_register_online(&deserialized).await{
                return Err(format!("register sink err {:?}", e));
            }
            return Ok(("ok".to_string(),Some(String::from_utf8(data.to_vec()).unwrap())));
        }
        _ => {
            return Err(format!("ok"));
        }
    }
}

pub fn register_group_dispatch_sync(mode:&str, data: &[u8])->Result<(String,Option<String>),String>{
    return block_on(register_group_dispatch_async(mode,data));
}